【Spring Cloud】Ribbon调用过程

负载均衡的实现方式

@RibbonClient

@LoadBalanced

@LoadBalanced实现负载均衡的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
public class ServiceribbonApplication {

public static void main(String[] args) {
SpringApplication.run(ServiceribbonApplication.class, args);
}

@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}

restTemplate调用:

1
2
3
4
5
6
7
8
9
10
11
@Service
public class HelloService {

@Autowired
RestTemplate restTemplate;


public String helloService(String name){
return restTemplate.getForObject("http://SERVICE-HI/hello?name="+name,String.class);
}
}

@LoadBalanced注解的原理可参考:

胖波:Ribbon中@LoadBalanced注解的原理

Robbon负载均衡源码分析

RestTemplate

进入restTemplate的getForObject方法,进行一系列的调用,最终会进入到doExecute方法中,将请求信息封装为ClientHttpRequest对象,调用它的execute方法执行请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
@Override

@Nullable
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor(responseType, this.getMessageConverters(), this.logger);
// 调用了execute方法
return this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, (Object[])uriVariables);
}


@Nullable
public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {
URI expanded = this.getUriTemplateHandler().expand(url, uriVariables);
// 调用doExecute方法
return this.doExecute(expanded, method, requestCallback, responseExtractor);
}

@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
// 封装请求
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
// 调用execute方法执行请求
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}

}

HttpAccessor

调用createRequest方法是在HttpAccessor类中实现的,在该方法中调用了getRequestFactory获取RequestFactory,由于RestTemplate继承了InterceptingHttpAccessor,InterceptingHttpAccessor又继承了HttpAccessor,所以执行getRequestFactory的时候会进入到InterceptingHttpAccessor的getRequestFactory创建请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class HttpAccessor { 
// 默认是SimpleClientHttpRequestFactory
private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();

// 使用工厂模式创建ClientHttpRequest
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
}

InterceptingHttpAccessor

在InterceptingHttpAccessor中的getRequestFactory中,会判断拦截器是否为空,如果不为空判断当前RequestFactory是否为空,如果为空将创建一个InterceptingClientHttpRequestFactory类型的工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class InterceptingHttpAccessor extends HttpAccessor {
@Override
public ClientHttpRequestFactory getRequestFactory() {
// 获取拦截器
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
// 如果拦截器不为空
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
// 如果factory为空
if (factory == null) {
// 创建InterceptingClientHttpRequestFactory
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
}

InterceptingClientHttpRequestFactory

InterceptingClientHttpRequestFactory当然创建的是InterceptingClientHttpRequest类型的请求类,所以最终会进入到InterceptingClientHttpRequest的execute方法中:

1
2
3
4
5
6
7
8
9
10
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {

private final List<ClientHttpRequestInterceptor> interceptors;

@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}

}

InterceptingClientHttpRequest

InterceptingClientHttpRequest继承了AbstractClientHttpRequest,它们都是ClientHttpRequest的子类,InterceptingClientHttpRequest中有一个内部类InterceptingRequestExecution,它有一个拦截器类型的ClientHttpRequestInterceptor迭代器iterator,还实现了execute方法,在execute方法中会获取拦截器,对请求进行拦截:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {

private class InterceptingRequestExecution implements ClientHttpRequestExecution {

private final Iterator<ClientHttpRequestInterceptor> iterator;

public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
// 获取拦截器
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
}

ClientHttpRequestInterceptor的实现类如下:

LoadBalancerInterceptor

进入到LoadBalancerInterceptor拦截器中,查看拦截器的intercept方法,在该方法中调用了LoadBalancerClient的execute方法执行请求,也就是说请求会被负载均衡拦截器所拦截,然后调用负载均衡客户端 LoadBalancerClient的execute方法执行请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

// 负载均衡客户端
private LoadBalancerClient loadBalancer;

private LoadBalancerRequestFactory requestFactory;

public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}

public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
// 获取服务名称
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
// 调用负载均衡客户端的execute方法执行请求
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}

RibbonLoadBalancerClient

LoadBalancerClient是一个接口,RibbonLoadBalancerClient实现了它,在execute方法中,首先调用getLoadBalancer方法获取LoadBalancer的实现类,选取一个负载均衡实现类,默认情况下会返回ZoneAwareLoadBalancer的负载均衡器,然后调用getServer(在此方法中又调用了chooseServer),根据负载均衡规则选取一个服务,之后就知道往哪个服务上发送请求,执行发送请求操作了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class RibbonLoadBalancerClient implements LoadBalancerClient {


@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
// 获取LoadBalancer实现类
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 根据负载均衡规则选取一个服务
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
// 调用execute方法执行请求
return execute(serviceId, ribbonServer, request);
}


protected ILoadBalancer getLoadBalancer(String serviceId) {
// 获取LoadBalancer实现类
return this.clientFactory.getLoadBalancer(serviceId);
}

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// 调用chooseServer选择一个服务
return loadBalancer.chooseServer(hint != null ? hint : "default");
}

// 执行发送请求的方法
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}

RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

try {
// 执行请求
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}

}

负载均衡策略

ILoadBalancer

ILoadBalancer是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ILoadBalancer {
void addServers(List<Server> var1);

Server chooseServer(Object var1);

void markServerDown(Server var1);

/** @deprecated */
@Deprecated
List<Server> getServerList(boolean var1);

List<Server> getReachableServers();

List<Server> getAllServers();
}

BaseLoadBalancer中实现了chooseServer方法,在BaseLoadBalancer中有一个默认的负载均衡策略是RoundRobinRule:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {

private final static IRule DEFAULT_RULE = new RoundRobinRule();
// 默认策略RoundRobinRule
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 选取服务
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}

七种负载均衡策略

  1. RoundRobinRule
  2. RandomRule
  3. AvailabilityFilteringRule
  4. WeightedResponseTimeRule
  5. RetryRule
  6. BestAvailableRule
  7. ZoneAvoidanceRule

参考:

【掘金小册】LinkedBear:SpringCloudNetflix 源码解读与原理分析

SpringBoot版本:2.1.9

SpringCloud版本: Greenwich.SR4